-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IcebergDocument as one implementation of VirtualDocument #3147
base: master
Are you sure you want to change the base?
Conversation
|
||
// Register a shutdown hook to delete the file when the JVM exits | ||
sys.addShutdownHook { | ||
withWriteLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a write lock, do we allow multiple writers to write the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is removed
override val bufferSize: Int = 1024 | ||
|
||
// Register a shutdown hook to delete the file when the JVM exits | ||
sys.addShutdownHook { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lifecycle for this file is also not correct. This file is created by the computing unit JVM, which can be killed right after the execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we do a global cleanup on OpResultStorage
level on top of #3146.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is removed.
6522779
to
a83d779
Compare
1edb551
to
cef347b
Compare
catalog: | ||
jdbc: # currently we only support storing catalog info via jdbc, i.e. https://iceberg.apache.org/docs/1.7.1/jdbc/ | ||
url: "jdbc:mysql://0.0.0.0:3306/texera_iceberg?serverTimezone=UTC" | ||
username: "root" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure to clean up those username and passwords.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaned
import scala.jdk.CollectionConverters._ | ||
|
||
class IcebergDocument[T >: Null <: AnyRef]( | ||
val catalog: Catalog, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the catalog
should be created/retrieved inside IcebergDocument
to make it self-contained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. I added a singleton catalog instance and all IcebergDocuments will use that instance as the catalog.
# Conflicts: # core/amber/src/main/scala/edu/uci/ics/texera/workflow/WorkflowCompiler.scala # core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala # core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/managed/ProgressiveSinkOpExec.scala
# Conflicts: # core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/OpResultStorage.scala
… into jiadong-add-file-result-storage
# Conflicts: # core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/SpecialPhysicalOpFactory.scala # core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/sink/ProgressiveSinkOpExec.scala
This reverts commit a2e53b5.
This PR introduces an implementation of result storage using Apache Iceberg.
How to enable the Iceberg result storage
Go to
storage-config.yaml
,result-storage-mode
toiceberg
storage.iceberg.catalog.jdbc
section,make sure the JDBC is accessible via the url, username, and password
Major changes
IcebergDocument
: a thread-safe implementation ofVirtualDocument
for storing and reading results in Iceberg tables.IcebergTableWriter
: an append-only writer for Iceberg tables with configurable buffer size.storage.iceberg
to specify catalog and table settings.Introduced Dependencies
In workflow-core, some new packages are added
although the file is not of type HadoopOutputFile, it still creats a Hadoop Configuration() as the placeholder. During the runtime, we don't have any dependency on Hadoop or HDFS.
Overview of the behavior IcebergDocument and IcebergWriter
IcebergDocument:
IcebergTableWriter:
How the result will be stored via Iceberg tables
key
, a table namedkey
will be created.key
, each worker will append immutable parquet files to the table's data space usingIcebergTableWriter
. To avoid the parquet filename collision, each worker will prefix its created file with ${workerIndex}_${fileIndex}, in whichworkerIndex
is its index, andfileIndex
is a number maintained that increased by 1 every time a new data file is created and flushed by the writer.IcebergDocument.get
. This iterator can incrementally read new data while writers are appending tuples.